{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## map(func)\n",
    "\n",
    "Retorna um novo RDD formado pela passagem de cada elemento do RDD de origem através de uma da função `func`.\n",
    "\n",
    "Exemplo:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]\n"
     ]
    }
   ],
   "source": [
    "data = sc.parallelize(range(1, 11))\n",
    "\n",
    "def duplicar(x): return x*x\n",
    "\n",
    "# data é um rdd\n",
    "res = data.map( duplicar )\n",
    "\n",
    "print (res.collect())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## filter(func)\n",
    "\n",
    "Retorna um novo RDD formado pela seleção daqueles elemento do RDD de origem que, quando passados para função `func`, retorna `true`.\n",
    "\n",
    "Exemplo:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[1, 3, 5, 7, 9]\n"
     ]
    }
   ],
   "source": [
    "data = sc.parallelize(range(1, 11))\n",
    "\n",
    "res = data.filter(lambda x: x%2 ==1)\n",
    "\n",
    "print(res.collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## flatMap(func)\n",
    "\n",
    "Semelhante ao map, porém cada item de entrada pode ser mapeado para `0` ou mais itens de saída (assim, func deve retornar uma lista em vez de um único item).\n",
    "\n",
    "Exemplo:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "map: [['Linha', '1'], ['Linha', '2']]\n",
      "flatMap: ['Linha', '1', 'Linha', '2']\n"
     ]
    }
   ],
   "source": [
    "data = sc.parallelize([\"Linha 1\", \"Linha 2\"])\n",
    "\n",
    "def partir(l): return l.split(\" \")\n",
    "\n",
    "print ('map:', data.map(partir).collect())\n",
    "\n",
    "print ('flatMap:', data.flatMap(partir).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## intersection(otherRDD)\n",
    "\n",
    "Retorna um novo RDD que contém a interseção dos elementos no RDD de origem e o outro RDD (argumento).\n",
    "\n",
    "Exemplo:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[0, 18, 12, 6]\n"
     ]
    }
   ],
   "source": [
    "two_multiples = sc.parallelize(range(0, 20, 2))\n",
    "\n",
    "three_multiples = sc.parallelize(range(0, 20, 3))\n",
    "\n",
    "print (two_multiples.intersection(three_multiples).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## groupByKey()\n",
    "\n",
    "Quando chamado em um RDD de pares `(K, V)`, retorna um conjunto de dados de pares `(K, Iterable<V>)`.\n",
    "\n",
    "Exemplo:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "b [2, 5]\n",
      "c [3]\n",
      "a [1, 2, 3]\n"
     ]
    }
   ],
   "source": [
    "data = sc.parallelize([ ('a', 1), ('b', 2), ('c', 3) , ('a', 2), ('b', 5), ('a', 3)])\n",
    "\n",
    "for pair in data.groupByKey().collect():\n",
    "    print (pair[0], list(pair[1]))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## reduceByKey(func)\n",
    "\n",
    "Quando chamado em um RDD de pares `(K, V)`, retorna um RDD de pares `(K, V)` onde os valores de cada chave são agregados usando a função de redução `func`, que deve ser do tipo `(V, V): V` (recebe 2 valores e retorna um novo valor).\n",
    "\n",
    "Exemplo:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('b', 7), ('c', 3), ('a', 6)]\n"
     ]
    }
   ],
   "source": [
    "data = sc.parallelize([ ('a', 1), ('b', 2), ('c', 3) , ('a', 2), ('b', 5), ('a', 3)])\n",
    "\n",
    "res = data.reduceByKey( lambda x,y: x+y )\n",
    "\n",
    "print (res.collect())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## sortByKey([asceding])\n",
    "\n",
    "Quando chamado em um RDD de pares `(K, V)` em que `K` é ordenável, retorna um RDD de pares `(K, V)` ordenados por chaves em ordem ascendente ou descendente, conforme especificado no argumento `ascending`.\n",
    "\n",
    "Exemplo:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('c', 3), ('b', 2), ('b', 5), ('a', 1), ('a', 2), ('a', 3)]\n"
     ]
    }
   ],
   "source": [
    "data = sc.parallelize([ ('a', 1), ('b', 2), ('c', 3) , ('a', 2), ('b', 5), ('a', 3)])\n",
    "\n",
    "print(data.sortByKey(ascending=False).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Links:\n",
    "* Lista mais completa com mais funções comuns: http://spark.apache.org/docs/1.6.3/programming-guide.html#transformations\n",
    "* Documentação da API RDD do Spark: http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - PySpark",
   "language": "python",
   "name": "apache_toree_pyspark"
  },
  "language_info": {
   "codemirror_mode": "text/x-ipython",
   "file_extension": ".py",
   "mimetype": "text/x-ipython",
   "name": "python",
   "pygments_lexer": "python",
   "version": "3.6.3\n"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
